package com.atguigu.gmall1118.app

import com.atguigu.gmall1118.bean.{TagInfo, TaskInfo, TaskTagRule}
import com.atguigu.gmall1118.constant.CodeConst
import com.atguigu.gmall1118.dao.{TagInfoDAO, TaskInfoDAO, TaskTagRuleDAO}
import com.atguigu.gmall1118.util.{MyPropertiesUtil, MysqlUtil}
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession

import java.util.Properties

object TaskSQLApp {

//任务1 ：根据标签的定义、SQL、规则等 ，读取数仓的数据写入到画像库中
  //1、读取在mysql中的标签定义：标签定义 任务定义  标签的值映射
  //
  //2 、 根据定义，创建一张标签表
  //
  //3、读取数仓的数据
  //
  //4、写入画像一个标签一张表   与（3）  可以用一个insert  select

  //1次运行 计算 一个标签
  def main(args: Array[String]): Unit = {

    //1、读取在mysql中的标签定义：标签定义 任务定义  标签的值映射
    //1个标签还是一堆标签？ 1个标签
    //  程序怎么知道应该运行哪个标签？ 画像平台会把taskid 作为其中一个参数 放到json中一起传递给远端提交器 ，
    //  远端提交器会把taskid 作为业务参数 通过spark-submit 传递给程序
    // 程序会根据main方法的参数 获得taskId  。 busiDate同理。
    //
    val sparkConf: SparkConf = new SparkConf().setAppName("task_sql_app")//.setMaster("local[*]")
    val sparkSession: SparkSession = SparkSession.builder().config(sparkConf).enableHiveSupport().getOrCreate()

    val taskId: String = args(0)
    val busiDate:String =args(1)


    val tagInfo: TagInfo = TagInfoDAO.getTagInfoBytaskId(taskId)
    val taskInfo: TaskInfo = TaskInfoDAO.getTaskInfo(taskId)
    val taskTagRuleList: List[TaskTagRule] = TaskTagRuleDAO.getTaskTagRuleList(taskId)


    println(tagInfo)
    println(taskInfo)
    println(taskTagRuleList)

    // 动态建表 ：根据页面填写的信息 提取要素 在程序中自动创建表 hive的表
    //    create table   表名                   ：  tagCode 作为表名   创建画像库
    //  （ 字段 字段类型  ...）              ：  uid   string  ,    tag_value    $tagValueType
    //    分区字段                                 :     dt
    //   格式 压缩                                 :    考虑数据量不大 字段少  考虑性能  采用 文本格式 不压缩
    //  存储位置                                    ：  hdfs路径/库/表


    //
    val properties: Properties = MyPropertiesUtil.load("config.properties")
    val hdfsPath: String = properties.getProperty("hdfs-store.path")
    val gmallDbName: String = properties.getProperty("data-warehouse.dbname")
    val upDbName: String = properties.getProperty("user-profile.dbname")

    val tableName= tagInfo.tagCode.toLowerCase

    val tagValueType: String = tagInfo.tagValueType match {
      case CodeConst.TAG_VALUE_TYPE_LONG => "bigint"
      case CodeConst.TAG_VALUE_TYPE_DECIMAL => "decimal(16,2)"
      case CodeConst.TAG_VALUE_TYPE_STRING => "string"
      case CodeConst.TAG_VALUE_TYPE_DATE => "string"
    }


    val createTableSQL=
      s"""
         |  create table if not exists $upDbName.$tableName
         |  (uid  string , tag_value $tagValueType)
         |  partitioned by (dt string)
         |  ROW FORMAT DELIMITED FIELDS TERMINATED BY '\\t'
         |  location '$hdfsPath/$upDbName/$tableName'
         |""".stripMargin

    println(createTableSQL)
    sparkSession.sql(createTableSQL)


    //3、读取数仓的数据
    //依据 任务taskInfo中的查询sql
    //根据taskTagRule  转换queryValue 为 tagvalue

    //case query_value when 'F' then '女' when 'M' then '男' .....

    //3.1 解决两个兼容性问题
    //1  要兼容没有四级标签的 标签计算
    //  2   sql中的时间分区字段的值 如何传达
    var tagValueSql=""
    if(taskTagRuleList.size>0){ //有四级标签 则需要做映射
      val whenThenList: List[String] = taskTagRuleList.map(tagRule => s" when  '${tagRule.queryValue}' then  '${tagRule.subTagValue}'")
      val whenThenSQL: String = whenThenList.mkString(" ")

      val caseWhen=s" case query_value $whenThenSQL  end tag_value"
      tagValueSql=caseWhen
    }else{//无四级标签 ，直接用查询值当标签值
      tagValueSql=" query_value"
    }
    //用业务日期替换$dt
    taskInfo.taskSql=taskInfo.taskSql.replace("$dt",busiDate)
    val selectSQL = s" select uid , $tagValueSql from  (${taskInfo.taskSql} )  tg"
    println(selectSQL)

    //4 创建insert 语句
    val insertSQL=s" insert overwrite table $upDbName.$tableName partition ( dt ='$busiDate')  $selectSQL"
    println(insertSQL)
    sparkSession.sql(s"use $gmallDbName")
    sparkSession.sql(insertSQL)


  }

}
